23. 缓冲信道和工作池(Buffered Channels and Worker Pools)
什么是缓冲信道?
在[上一教程]里,我们讨论的主要是无缓冲信道。我们在[信道]的教程里详细讨论了,无缓冲信道的发送和接收过程是阻塞的。
我们还可以创建一个有缓冲(Buffer)的信道。只在缓冲已满的情况,才会阻塞向缓冲信道(Buffered Channel)发送数据。同样,只有在缓冲为空的时候,才会阻塞从缓冲信道接收数据。
通过向 make
函数再传递一个表示容量的参数(指定缓冲的大小),可以创建缓冲信道。
1 | ch := make(chan type, capacity) |
要让一个信道有缓冲,上面语法中的 capacity
应该大于 0。无缓冲信道的容量默认为 0,因此我们在[上一教程]创建信道时,省略了容量参数。
我们开始编写代码,创建一个缓冲信道。
示例一
1 | package main |
在上面程序里的第 9 行,我们创建了一个缓冲信道,其容量为 2。由于该信道的容量为 2,因此可向它写入两个字符串,而且不会发生阻塞。在第 10 行和第 11 行,我们向信道写入两个字符串,该信道并没有发生阻塞。我们又在第 12 行和第 13 行分别读取了这两个字符串。该程序输出:
1 | naveen |
示例二
我们再看一个缓冲信道的示例,其中有一个并发的 Go 协程来向信道写入数据,而 Go 主协程负责读取数据。该示例帮助我们进一步理解,在向缓冲信道写入数据时,什么时候会发生阻塞。
1 | package main |
在上面的程序中,第 16 行在 Go 主协程中创建了容量为 2 的缓冲信道 ch
,而第 17 行把 ch
传递给了 write
协程。接下来 Go 主协程休眠了两秒。在这期间,write
协程在并发地运行。write
协程有一个 for 循环,依次向信道 ch
写入 0~4。而缓冲信道的容量为 2,因此 write
协程里立即会向 ch
写入 0 和 1,接下来发生阻塞,直到 ch
内的值被读取。因此,该程序立即打印出下面两行:
1 | successfully wrote 0 to ch |
打印上面两行之后,write
协程中向 ch
的写入发生了阻塞,直到 ch
有值被读取到。而 Go 主协程休眠了两秒后,才开始读取该信道,因此在休眠期间程序不会打印任何结果。主协程结束休眠后,在第 19 行使用 for range 循环,开始读取信道 ch
,打印出了读取到的值后又休眠两秒,这个循环一直到 ch
关闭才结束。所以该程序在两秒后会打印下面两行:
1 | read value 0 from ch |
该过程会一直进行,直到信道读取完所有的值,并在 write
协程中关闭信道。最终输出如下:
1 | successfully wrote 0 to ch |
死锁
1 | package main |
在上面程序里,我们向容量为 2 的缓冲信道写入 3 个字符串。当在程序控制到达第 3 次写入时(第 11 行),由于它超出了信道的容量,因此这次写入发生了阻塞。现在想要这次写操作能够进行下去,必须要有其它协程来读取这个信道的数据。但在本例中,并没有并发协程来读取这个信道,因此这里会发生死锁(deadlock)。程序会在运行时触发 panic,信息如下:
1 | fatal error: all goroutines are asleep - deadlock! |
长度 vs 容量
缓冲信道的容量是指信道可以存储的值的数量。我们在使用 make
函数创建缓冲信道的时候会指定容量大小。
缓冲信道的长度是指信道中当前排队的元素个数。
代码可以把一切解释得很清楚。:)
1 | package main |
在上面的程序里,我们创建了一个容量为 3 的信道,于是它可以保存 3 个字符串。接下来,我们分别在第 9 行和第 10 行向信道写入了两个字符串。于是信道有两个字符串排队,因此其长度为 2。在第 13 行,我们又从信道读取了一个字符串。现在该信道内只有一个字符串,因此其长度变为 1。该程序会输出:
1 | capacity is 3 |
WaitGroup
在本教程的下一节里,我们会讲到工作池(Worker Pools)。而 WaitGroup
用于实现工作池,因此要理解工作池,我们首先需要学习 WaitGroup
。
WaitGroup
用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。假设我们有 3 个并发执行的 Go 协程(由 Go 主协程生成)。Go 主协程需要等待这 3 个协程执行结束后,才会终止。这就可以用 WaitGroup
来实现。
理论说完了,我们编写点儿代码吧。:)
1 | package main |
[WaitGroup]是一个结构体类型,我们在第 18 行创建了 WaitGroup
类型的变量,其初始值为零值。WaitGroup
使用计数器来工作。当我们调用 WaitGroup
的 Add
并传递一个 int
时,WaitGroup
的计数器会加上 Add
的传参。要减少计数器,可以调用 WaitGroup
的 Done()
方法。Wait()
方法会阻塞调用它的 Go 协程,直到计数器变为 0 后才会停止阻塞。
上述程序里,for 循环迭代了 3 次,我们在循环内调用了 wg.Add(1)
(第 20 行)。因此计数器变为 3。for 循环同样创建了 3 个 process
协程,然后在第 23 行调用了 wg.Wait()
,确保 Go 主协程等待计数器变为 0。在第 13 行,process
协程内调用了 wg.Done
,可以让计数器递减。一旦 3 个子协程都执行完毕(即 wg.Done()
调用了 3 次),那么计数器就变为 0,于是主协程会解除阻塞。
在第 21 行里,传递 wg 的地址是很重要的。如果没有传递 wg 的地址,那么每个 Go 协程将会得到一个 WaitGroup 值的拷贝,因而当它们执行结束时,main 函数并不会知道。
该程序输出:
1 | started Goroutine 2 |
由于 Go 协程的执行顺序不一定,因此你的输出可能和我不一样。:)
工作池的实现
缓冲信道的重要应用之一就是实现[工作池]。
一般而言,工作池就是一组等待任务分配的线程。一旦完成了所分配的任务,这些线程可继续等待任务的分配。
我们会使用缓冲信道来实现工作池。我们工作池的任务是计算所输入数字的每一位的和。例如,如果输入 234,结果会是 9(即 2 + 3 + 4)。向工作池输入的是一列伪随机数。
我们工作池的核心功能如下:
- 创建一个 Go 协程池,监听一个等待作业分配的输入型缓冲信道。
- 将作业添加到该输入型缓冲信道中。
- 作业完成后,再将结果写入一个输出型缓冲信道。
- 从输出型缓冲信道读取并打印结果。
我们会逐步编写这个程序,让代码易于理解。
第一步就是创建一个结构体,表示作业和结果。
1 | type Job struct { |
所有 Job
结构体变量都会有 id
和 randomno
两个字段,randomno
用于计算其每位数之和。
而 Result
结构体有一个 job
字段,表示所对应的作业,还有一个 sumofdigits
字段,表示计算的结果(每位数字之和)。
第二步是分别创建用于接收作业和写入结果的缓冲信道。
1 | var jobs = make(chan Job, 10) |
工作协程(Worker Goroutine)会监听缓冲信道 jobs
里更新的作业。一旦工作协程完成了作业,其结果会写入缓冲信道 results
。
如下所示,digits
函数的任务实际上就是计算整数的每一位之和,最后返回该结果。为了模拟出 digits
在计算过程中花费了一段时间,我们在函数内添加了两秒的休眠时间。
1 | func digits(number int) int { |
然后,我们写一个创建工作协程的函数。
1 | func worker(wg *sync.WaitGroup) { |
上面的函数创建了一个工作者(Worker),读取 jobs
信道的数据,根据当前的 job
和 digits
函数的返回值,创建了一个 Result
结构体变量,然后将结果写入 results
缓冲信道。worker
函数接收了一个 WaitGroup
类型的 wg
作为参数,当所有的 jobs
完成的时候,调用了 Done()
方法。
createWorkerPool
函数创建了一个 Go 协程的工作池。
1 | func createWorkerPool(noOfWorkers int) { |
上面函数的参数是需要创建的工作协程的数量。在创建 Go 协程之前,它调用了 wg.Add(1)
方法,于是 WaitGroup
计数器递增。接下来,我们创建工作协程,并向 worker
函数传递 wg
的地址。创建了需要的工作协程后,函数调用 wg.Wait()
,等待所有的 Go 协程执行完毕。所有协程完成执行之后,函数会关闭 results
信道。因为所有协程都已经执行完毕,于是不再需要向 results
信道写入数据了。
现在我们已经有了工作池,我们继续编写一个函数,把作业分配给工作者。
1 | func allocate(noOfJobs int) { |
上面的 allocate
函数接收所需创建的作业数量作为输入参数,生成了最大值为 998 的伪随机数,并使用该随机数创建了 Job
结构体变量。这个函数把 for 循环的计数器 i
作为 id,最后把创建的结构体变量写入 jobs
信道。当写入所有的 job
时,它关闭了 jobs
信道。
下一步是创建一个读取 results
信道和打印输出的函数。
1 | func result(done chan bool) { |
result
函数读取 results
信道,并打印出 job
的 id
、输入的随机数、该随机数的每位数之和。result
函数也接受 done
信道作为参数,当打印所有结果时,done
会被写入 true。
现在一切准备充分了。我们继续完成最后一步,在 main()
函数中调用上面所有的函数。
1 | func main() { |
我们首先在 main
函数的第 2 行,保存了程序的起始时间,并在最后一行(第 12 行)计算了 endTime
和 startTime
的差值,显示出程序运行的总时间。由于我们想要通过改变协程数量,来做一点基准指标(Benchmark),所以需要这么做。
我们把 noOfJobs
设置为 100,接下来调用了 allocate
,向 jobs
信道添加作业。
我们创建了 done
信道,并将其传递给 result
协程。于是该协程会开始打印结果,并在完成打印时发出通知。
通过调用 createWorkerPool
函数,我们最终创建了一个有 10 个协程的工作池。main
函数会监听 done
信道的通知,等待所有结果打印结束。
为了便于参考,下面是整个程序。我还引用了必要的包。
1 | package main |
为了更精确地计算总时间,请在你的本地机器上运行该程序。
该程序输出:
1 | Job id 1, input random no 636, sum of digits 15 |
程序总共会打印 100 行,对应着 100 项作业,然后最后会打印一行程序消耗的总时间。你的输出会和我的不同,因为 Go 协程的运行顺序不一定,同样总时间也会因为硬件而不同。在我的例子中,运行程序大约花费了 20 秒。
现在我们把 main
函数里的 noOfWorkers
增加到 20。我们把工作者的数量加倍了。由于工作协程增加了(准确说来是两倍),因此程序花费的总时间会减少(准确说来是一半)。在我的例子里,程序会打印出 10.004364685 秒。
1 | ... |
现在我们可以理解了,随着工作协程数量增加,完成作业的总时间会减少。你们可以练习一下:在 main
函数里修改 noOfJobs
和 noOfWorkers
的值,并试着去分析一下结果。